package com.elastic.search.listener;

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.elastic.search.event.UpdateCanalEvent;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * update  对于ES ，配合canal 回滚时会导致死循环
 * 最好先查询 在更新
 *
 * @author huangdeyao
 */
@Component
public class UpdateCanalListener extends AbstractCanalListener<UpdateCanalEvent> {
    private static final Logger logger = LoggerFactory.getLogger(UpdateCanalListener.class);
    @Autowired
    RestHighLevelClient restHighLevelClient;

    @Override
    protected void doSync(String database, String table, String index, RowData rowData) {
        logger.debug("===============================Update==========================================");
        logger.info("database:" + database + ",table:" + table + ",index:" + index + ",rowData:" + rowData);
        Map<String, Object> jsonMap = parseColumnsToMap(rowData.getAfterColumnsList());
        if (jsonMap.get(ES_ID) != null) {
            GetRequest getRequest = new GetRequest(index, jsonMap.get(ES_ID).toString());
            GetResponse getResponse = null;
            try {
                getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (getResponse != null && getResponse.getSource() != null) {
                UpdateRequest updateRequest = new UpdateRequest(index, jsonMap.get(ES_ID).toString());
                updateRequest.doc(jsonMap);
                updateRequest.timeout("1s");
                try {
                    UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
                    logger.info("id==>" + jsonMap.get(ES_ID) + "======== 实时同步数据 update 结果 ======>" + updateResponse.status());
                    logger.info(updateResponse.toString());
                } catch (IOException e) {
                    logger.info("=========实时同步数据 update 异常======");
                    e.printStackTrace();
                }
            } else {
                logger.info("=========实时同步数据 update getResponse/getResponse.getSource() maybe is null ======");
            }

        } else {
            logger.info("=========实时同步数据 update 没有表id字段======");
        }
    }
}
